package com.lcc.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lcc.realtime.app.BaseAppV1;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import static com.lcc.realtime.common.Constant.*;

public class DwdDbApp extends BaseAppV1 {
    public static void main(String[] args) {
        new DwdDbApp().init(
                "DwdDbApp",
                10002,
                1,
                "DwdDbApp",
                "DwdDbApp",
                TOPIC_ODS_DB
        );
    }

    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
        //数据清洗
        etl(stream);
        //读取配置表数据

        //业务和配置数据connect

        //数据分别写入topic
    }

    private SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> stream) {
        return stream
                .map(JSON::parseObject)
                .filter(obj -> {
                    return "gmall2022".equals(obj.getString("database"))
                            && obj.getString("table")!=null
                            && ("insert".equals(obj.getString("type"))
                            || "update".equals(obj.getString("type")))
                            && obj.getString("data").length()>2;
                });
    }
}
